// Copyright (c) ZeroC, Inc.

#ifndef DATASTORM_DATASTORM_H
#define DATASTORM_DATASTORM_H

#include "Config.h"
#include "DataStorm/SampleEvent.h"
#include "InternalI.h"
#include "InternalT.h"
#include "Node.h"
#include "Types.h"

#include <regex>

#if defined(__clang__)
#    pragma clang diagnostic push
#    pragma clang diagnostic ignored "-Wshadow-field-in-constructor"
#elif defined(__GNUC__)
#    pragma GCC diagnostic push
#    pragma GCC diagnostic ignored "-Wshadow"
#endif

namespace DataStorm
{
    /// A sample provides information about a data element update.
    /// The Sample template provides access to the key, value as well as additional information such as the event,
    /// timestamp, update tag. Samples are generated and published by writers and received by readers.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string> class Sample
    {
    public:
        /// The type of the sample key.
        using KeyType = Key;

        /// The type of the sample value.
        using ValueType = Value;

        /// The type of the update tag. The update tag type defaults to string if it's not explicitly specified
        /// with the Sample template parameters.
        using UpdateTagType = UpdateTag;

        /// Gets the event associated with the sample.
        /// @return The sample event.
        [[nodiscard]] SampleEvent getEvent() const noexcept;

        /// Gets the key of the sample.
        /// @return The sample key.
        [[nodiscard]] const Key& getKey() const noexcept;

        /// Gets the value of the sample.
        /// Depending on the sample event, the sample value might not always be available. It's the case if the
        /// sample event is Remove where this method will return a default value.
        /// @return The sample value.
        [[nodiscard]] const Value& getValue() const noexcept;

        /// Gets the update tag for the partial update.
        /// This method should only be called if the sample event is PartialUpdate.
        /// @return The update tag.
        [[nodiscard]] UpdateTag getUpdateTag() const;

        /// Gets the timestamp of the sample.
        /// The timestamp is generated by the writer and corresponds to the time of sending.
        /// @return The timestamp.
        [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> getTimeStamp() const noexcept;

        /// Gets the origin of the sample.
        /// The origin of the sample identifies uniquely on the node the writer that created the sample. It's the
        /// name of the writer if a name was explicitly provided on creation of the writer. Otherwise, if no name
        /// was provided, an unique identifier is generated by DataStorm.
        /// @return The origin of the sample.
        [[nodiscard]] const std::string& getOrigin() const noexcept;

        /// Gets the session identifier of the session that received this sample.
        /// This session identifier can be used to retrieve the Ice connection with the node.
        /// @return The session identifier.
        [[nodiscard]] const std::string& getSession() const noexcept;

        /// @private
        Sample(const std::shared_ptr<DataStormI::Sample>&) noexcept;

    private:
        std::shared_ptr<DataStormI::SampleT<Key, Value, UpdateTag>> _impl;
    };

    /// Converts the given sample type vector to a string and add it to the stream.
    /// @param os The output stream
    /// @param types The sample type vector to add to the stream
    /// @return The output stream
    inline std::ostream& operator<<(std::ostream& os, const SampleEventSeq& types)
    {
        Ice::print(os, types);
        return os;
    }

    /// Converts the given sample to a string and add it to the stream. The implementation outputs the sample value.
    /// @param os The output stream
    /// @param sample The sample to add to the stream
    /// @return The output stream
    template<typename K, typename V, typename U>
    std::ostream& operator<<(std::ostream& os, const Sample<K, V, U>& sample)
    {
        os << sample.getValue();
        return os;
    }

    /// The Reader class is used to retrieve samples for a data element.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag> class Reader
    {
    public:
        /// The key type.
        using KeyType = Key;

        /// The value type.
        using ValueType = Value;

        /// Move constructor.
        /// @param reader The reader to move from.
        Reader(Reader&& reader) noexcept;

        /// Destructor.
        /// The destruction of the reader disconnects the reader from the writers.
        ~Reader();

        /// Move assignment operator.
        /// @param reader The reader to remove from.
        /// @return A reference to this reader.
        Reader& operator=(Reader&& reader) noexcept;

        /// Indicates whether or not writers are online.
        /// @return `true` if writers are connected, `false` otherwise.
        [[nodiscard]] bool hasWriters() const noexcept;

        /// Waits for the given number of writers to be online.
        /// @param count The number of writers to wait for.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForWriters(unsigned int count = 1) const;

        /// Waits for writers to be offline.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForNoWriters() const;

        /// Gets the connected writers.
        /// @return The names of the connected writers.
        [[nodiscard]] std::vector<std::string> getConnectedWriters() const;

        /// Gets the keys for which writers are connected to this reader.
        /// @return The keys for which we have writers connected.
        [[nodiscard]] std::vector<Key> getConnectedKeys() const;

        /// Returns all the unread samples.
        /// @return The unread samples.
        [[nodiscard]] std::vector<Sample<Key, Value, UpdateTag>> getAllUnread();

        /// Waits for the given number of unread samples to be available.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForUnread(unsigned int count = 1) const;

        /// Returns whether or not unread samples are available.
        /// @return `true` if there unread samples are queued, `false` otherwise.
        [[nodiscard]] bool hasUnread() const noexcept;

        /// Returns the next unread sample.
        /// @return The unread sample.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        [[nodiscard]] Sample<Key, Value, UpdateTag> getNextUnread();

        /// Calls the given functions to provide the initial set of connected keys and when a key is added or
        /// removed from the set of connected keys. If callback functions are already set, they will be replaced.
        /// The connected keys represent the set of keys for which writers are connected to this reader.
        /// The @p init callback is always called after this method returns to provide the initial set of connected
        /// keys. The @p update callback is called when new keys are added or removed from the set of connected keys.
        /// @param init The function to call with the initial set of connected keys.
        /// @param update The function to call when a key is added or removed from the set.
        void onConnectedKeys(
            std::function<void(std::vector<Key>)> init,
            std::function<void(CallbackReason, Key)> update) noexcept;

        /// Calls the given functions to provide the initial set of connected writers and when a new writer
        /// connects or disconnects. If callback functions are already set, they will be replaced.
        /// The @p init callback is always called after this method returns to provide the initial set of connected
        /// writers. The @p update callback is called when new writers connect or disconnect.
        /// @param init The function to call with the initial set of connected writers.
        /// @param update The function to call when a new writer connects or disconnects.
        void onConnectedWriters(
            std::function<void(std::vector<std::string>)> init,
            std::function<void(CallbackReason, std::string)> update) noexcept;

        /// Calls the given function to provide the initial set of unread samples and when new samples are queued.
        /// If a function is already set, it will be replaced.
        /// The @p init callback is always called after this method returns to provide the initial set of unread
        /// samples. The @p queue callback is called when a new sample is received.
        /// @param init The function to call with the initial set of unread samples.
        /// @param queue The function to call when a new sample is received.
        void onSamples(
            std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
            std::function<void(Sample<Key, Value, UpdateTag>)> queue) noexcept;

    protected:
        /// @private
        Reader(const std::shared_ptr<DataStormI::DataReader>& impl) noexcept : _impl(impl) {}

        /// @private
        std::shared_ptr<DataStormI::DataReader> _impl;
    };

    /// The Writer class is used to write samples for a data element.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag> class Writer
    {
    public:
        /// The key type.
        using KeyType = Key;

        /// The value type.
        using ValueType = Value;

        /// Move constructor.
        /// @param writer The writer to move from.
        Writer(Writer&& writer) noexcept;

        /// Move assignment operator.
        /// @param writer The writer to move from.
        /// @return A reference to this writer.
        Writer& operator=(Writer&& writer) noexcept;

        /// Destructor.
        /// The destruction of the writer disconnects the writer from the readers.
        ~Writer();

        /// Indicates whether or not readers are online.
        /// @return `true` if readers are connected, `false` otherwise.
        [[nodiscard]] bool hasReaders() const noexcept;

        /// Waits for the given number of readers to be online.
        /// @param count The number of readers to wait.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForReaders(unsigned int count = 1) const;

        /// Waits for readers to be offline.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForNoReaders() const;

        /// Gets the connected readers.
        /// @return The names of the connected readers.
        [[nodiscard]] std::vector<std::string> getConnectedReaders() const;

        /// Gets the keys for which readers are connected to this writer.
        /// @return The keys for which we have writers connected.
        [[nodiscard]] std::vector<Key> getConnectedKeys() const;

        /// Gets the last written sample.
        /// @return The last written sample.
        /// @throws std::logic_error If there's no sample.
        [[nodiscard]] Sample<Key, Value, UpdateTag> getLast();

        /// Gets all the written sample kept in the writer history.
        /// @return The sample history.
        [[nodiscard]] std::vector<Sample<Key, Value, UpdateTag>> getAll();

        /// Calls the given functions to provide the initial set of connected keys and when a key is added or
        /// removed from the set of connected keys. If callback functions are already set, they will be replaced.
        /// The connected keys represent the set of keys for which writers are connected to this reader.
        /// The @p init callback is always called after this method returns to provide the initial set of connected
        /// keys. The @p update callback is called when new keys are added or removed from the set of connected keys.
        /// @param init The function to call with the initial set of connected keys.
        /// @param update The function to call when a key is added or removed from the set.
        void onConnectedKeys(
            std::function<void(std::vector<Key>)> init,
            std::function<void(CallbackReason, Key)> update) noexcept;

        /// Calls the given functions to provide the initial set of connected readers and when a new reader
        /// connects or disconnects. If callback functions are already set, they will be replaced.
        /// The @p init callback is always called after this method returns to provide the initial set of connected
        /// readers. The @p update callback is called when new readers connect or disconnect.
        /// @param init The function to call with the initial set of connected readers.
        /// @param update The function to call when a new reader connects or disconnects.
        void onConnectedReaders(
            std::function<void(std::vector<std::string>)> init,
            std::function<void(CallbackReason, std::string)> update) noexcept;

    protected:
        /// @private
        Writer(const std::shared_ptr<DataStormI::DataWriter>& impl) noexcept : _impl(impl) {}

        /// @private
        std::shared_ptr<DataStormI::DataWriter> _impl;
    };

    /// The Topic class.
    /// This class allows constructing reader and writer objects. It's also used to setup filter and updater
    /// functions.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string> class Topic
    {
    public:
        /// The topic's key type.
        using KeyType = Key;

        /// The topic's value type.
        using ValueType = Value;

        /// The topic's update tag type (defaults to std::string if not specified).
        using UpdateTagType = UpdateTag;

        /// The topic's writer type.
        using WriterType = Writer<Key, Value, UpdateTag>;

        /// The topic's reader type.
        using ReaderType = Reader<Key, Value, UpdateTag>;

        /// The topic's sample type.
        using SampleType = Sample<Key, Value, UpdateTag>;

        /// Constructs a new Topic for the topic with the given name.
        /// @param node The node.
        /// @param name The name of the topic.
        Topic(const Node& node, std::string name) noexcept;

        /// Move constructor.
        /// @param topic The topic to move from.
        Topic(Topic&& topic) noexcept
            : _name(std::move(topic._name)),
              _topicFactory(std::move(topic._topicFactory)),
              _keyFactory(std::move(topic._keyFactory)),
              _tagFactory(std::move(topic._tagFactory)),
              _keyFilterFactories(std::move(topic._keyFilterFactories)),
              _sampleFilterFactories(std::move(topic._sampleFilterFactories)),
              _reader(std::move(topic._reader)),
              _writer(std::move(topic._writer)),
              _updaters(std::move(topic._updaters))
        {
        }

        /// Destructor.
        /// The destructor disconnects the topic from peers.
        ~Topic();

        /// Move assignment operator.
        /// @param topic The topic to move from.
        /// @return A reference to this topic.
        Topic& operator=(Topic&& topic) noexcept;

        /// Indicates whether or not data writers are online.
        /// @return `true` if data writers are connected, `false` otherwise.
        [[nodiscard]] bool hasWriters() const noexcept;

        /// Waits for the given number of data writers to be online.
        /// @param count The number of data writers to wait for.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForWriters(unsigned int count = 1) const;

        /// Waits for data writers to be offline.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForNoWriters() const;

        /// Sets the default configuration used to construct readers.
        /// @param config The default writer configuration.
        void setWriterDefaultConfig(const WriterConfig& config) noexcept;

        /// Indicates whether or not data readers are online.
        /// @return `true` if data readers are connected, `false` otherwise.
        [[nodiscard]] bool hasReaders() const noexcept;

        /// Waits for the given number of data readers to be online.
        /// @param count The number of data readers to wait.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForReaders(unsigned int count = 1) const;

        /// Waits for data readers to be offline.
        /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
        void waitForNoReaders() const;

        /// Sets the default configuration used to construct readers.
        /// @param config The default reader configuration.
        void setReaderDefaultConfig(const ReaderConfig& config) noexcept;

        /// Sets an updater function for the given update tag. The function is called when a partial update is
        /// received or sent to compute the new value. The function is provided the latest value and the partial
        /// update. It should return the new value.
        /// @param tag The update tag.
        /// @param updater The updater function.
        template<typename UpdateValue>
        void setUpdater(const UpdateTag& tag, std::function<void(Value&, UpdateValue)> updater) noexcept;

        /// Sets a key filter factory. The given factory function must return a filter function that returns `true` if
        /// the key matches the filter criteria, `false` otherwise.
        /// @param name The name of the key filter.
        /// @param factory The filter factory function.
        template<typename Criteria>
        void setKeyFilter(
            std::string name,
            std::function<std::function<bool(const Key&)>(const Criteria&)> factory) noexcept;

        /// Sets a sample filter factory. The given factory function must return a filter function that returns `true`
        /// if the sample matches the filter criteria, `false` otherwise.
        /// @param name The name of the sample filter.
        /// @param factory The filter factory function.
        template<typename Criteria>
        void setSampleFilter(
            std::string name,
            std::function<std::function<bool(const SampleType&)>(const Criteria&)> factory) noexcept;

    private:
        [[nodiscard]] std::shared_ptr<DataStormI::TopicReader> getReader() const;
        [[nodiscard]] std::shared_ptr<DataStormI::TopicWriter> getWriter() const;
        [[nodiscard]] Ice::CommunicatorPtr getCommunicator() const noexcept;

        template<typename, typename, typename> friend class SingleKeyWriter;
        template<typename, typename, typename> friend class MultiKeyWriter;
        template<typename, typename, typename> friend class SingleKeyReader;
        template<typename, typename, typename> friend class MultiKeyReader;
        template<typename, typename, typename> friend class FilteredKeyReader;

        // These fields are non-const because we move them in the move-assignment operator.
        std::string _name;
        std::shared_ptr<DataStormI::TopicFactory> _topicFactory;
        std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
        std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
        std::shared_ptr<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>> _keyFilterFactories;
        std::shared_ptr<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>> _sampleFilterFactories;

        mutable std::mutex _mutex;
        mutable std::shared_ptr<DataStormI::TopicReader> _reader;
        mutable std::shared_ptr<DataStormI::TopicWriter> _writer;
        mutable std::map<std::shared_ptr<DataStormI::Tag>, DataStormI::Topic::Updater> _updaters;
    };

    /// Filter structure to specify the filter name and criteria value.
    /// @headerfile DataStorm/DataStorm.h
    template<typename T> struct Filter
    {
        /// Constructs a filter structure with the given name and criteria.
        /// @param name The filter name
        /// @param criteria The criteria
        template<typename TT>
        Filter(std::string name, TT&& criteria) noexcept : name(std::move(name)),
                                                           criteria(std::forward<TT>(criteria))
        {
        }

        /// The filter name.
        std::string name;

        /// The filter criteria value.
        T criteria;
    };

    /// The key reader to read the data element associated with a given key.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class SingleKeyReader : public Reader<Key, Value, UpdateTag>
    {
    public:
        /// Constructs a new reader for the given key. The construction of the reader connects the reader to writers
        /// with a matching key.
        /// @param topic The topic.
        /// @param key The key of the data element to read.
        /// @param name The optional reader name.
        /// @param config The reader configuration.
        SingleKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Key& key,
            std::string name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /// Constructs a new reader for the given key and sample filter criteria. The construction of the reader
        /// connects the reader to writers with a matching key. The writer will only send samples matching the
        /// given sample filter criteria to the reader.
        /// @param topic The topic.
        /// @param key The key of the data element to read.
        /// @param sampleFilter The sample filter.
        /// @param name The optional reader name.
        /// @param config The reader configuration.
        template<typename SampleFilterCriteria>
        SingleKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Key& key,
            const Filter<SampleFilterCriteria>& sampleFilter,
            std::string name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /// Move constructor.
        /// @param reader The reader to move from.
        SingleKeyReader(SingleKeyReader&& reader) noexcept;

        /// Move assignment operator.
        /// @param reader The reader to move from.
        /// @return A reference to this reader.
        SingleKeyReader& operator=(SingleKeyReader&& reader) noexcept;
    };

    /// The key reader to read the data element associated with a given set of keys.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class MultiKeyReader : public Reader<Key, Value, UpdateTag>
    {
    public:
        /// Constructs a new reader for the given keys. The construction of the reader connects the reader to
        /// writers with matching keys. If an empty vector of keys is provided, the reader will connect to all the
        /// available writers.
        /// @param topic The topic.
        /// @param keys The keys of the data elements to read.
        /// @param name The optional reader name.
        /// @param config The reader configuration.
        MultiKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const std::vector<Key>& keys,
            std::string name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /// Constructs a new reader for the given keys and sample filter criteria. The construction of the reader
        /// connects the reader to writers with matching keys. If an empty vector of keys is provided, the reader
        /// will connect to all the available writers. The writer will only send samples matching the given sample
        /// filter criteria to the reader.
        /// @param topic The topic.
        /// @param keys The keys of the data elements to read.
        /// @param sampleFilter The sample filter.
        /// @param name The optional reader name.
        /// @param config The reader configuration.
        template<typename SampleFilterCriteria>
        MultiKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const std::vector<Key>& keys,
            const Filter<SampleFilterCriteria>& sampleFilter,
            std::string name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /// Move constructor.
        /// @param reader The reader to move from.
        MultiKeyReader(MultiKeyReader&& reader) noexcept;

        /// Move assignment operator.
        /// @param reader The reader to move from.
        /// @return A reference to this reader.
        MultiKeyReader& operator=(MultiKeyReader&& reader) noexcept;
    };

    /// Creates a key reader for the given topic and key. This helper method deduces the topic Key, Value and
    /// UpdateTag types from the topic argument.
    /// @param topic The topic.
    /// @param key The key.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename K, typename V, typename UT>
    [[nodiscard]] SingleKeyReader<K, V, UT> makeSingleKeyReader(
        const Topic<K, V, UT>& topic,
        const typename Topic<K, V, UT>::KeyType& key,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return SingleKeyReader<K, V, UT>(topic, key, std::move(name), config);
    }

    /// Creates a key reader for the given topic, key and sample filter. This helper method deduces the topic Key
    /// and Value types from the topic argument.
    /// @param topic The topic.
    /// @param key The key.
    /// @param sampleFilter The sample filter.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename SFC, typename K, typename V, typename UT>
    [[nodiscard]] SingleKeyReader<K, V, UT> makeSingleKeyReader(
        const Topic<K, V, UT>& topic,
        const typename Topic<K, V, UT>::KeyType& key,
        const Filter<SFC>& sampleFilter,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return SingleKeyReader<K, V, UT>(topic, key, sampleFilter, std::move(name), config);
    }

    /// Creates a multi-key reader for the given topic. This helper method deduces the topic Key, Value and
    /// UpdateTag types from the topic argument.
    /// The reader will only receive samples for the given set of keys.
    /// @param topic The topic.
    /// @param keys The keys.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename K, typename V, typename UT>
    [[nodiscard]] MultiKeyReader<K, V, UT> makeMultiKeyReader(
        const Topic<K, V, UT>& topic,
        const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, keys, std::move(name), config);
    }

    /// Creates a multi-key reader for the given topic, keys and sample filter. This helper method deduces the
    /// topic Key and Value types from the topic argument.
    /// The reader will only receive samples for the given set of keys.
    /// @param topic The topic.
    /// @param keys The keys.
    /// @param sampleFilter The sample filter.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename SFC, typename K, typename V, typename UT>
    [[nodiscard]] MultiKeyReader<K, V, UT> makeMultiKeyReader(
        const Topic<K, V, UT>& topic,
        const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
        const Filter<SFC>& sampleFilter,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, keys, sampleFilter, std::move(name), config);
    }

    /// Creates an any-key reader for the given topic. This helper method deduces the topic Key, Value and
    /// UpdateTag types from the topic argument.
    /// The reader will receive samples for any keys from the topic.
    /// @param topic The topic.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename K, typename V, typename UT>
    [[nodiscard]] MultiKeyReader<K, V, UT> makeAnyKeyReader(
        const Topic<K, V, UT>& topic,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, {}, std::move(name), config);
    }

    /// Creates an any-key reader for the given topic and sample filter. This helper method deduces the topic Key
    /// and Value types from the topic argument.
    /// The reader will receive samples for the keys from the topic.
    /// @param topic The topic.
    /// @param sampleFilter The sample filter.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename SFC, typename K, typename V, typename UT>
    [[nodiscard]] MultiKeyReader<K, V, UT> makeAnyKeyReader(
        const Topic<K, V, UT>& topic,
        const Filter<SFC>& sampleFilter,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return MultiKeyReader<K, V, UT>(topic, {}, sampleFilter, std::move(name), config);
    }

    /// The filtered reader to read data elements whose key match a given filter.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class FilteredKeyReader : public Reader<Key, Value, UpdateTag>
    {
    public:
        /// Constructs a new reader for the given key filter. The construction of the reader connects the reader to
        /// writers whose key matches the key filter criteria.
        /// @param topic The topic.
        /// @param keyFilter The key filter.
        /// @param name The optional reader name.
        /// @param config The reader configuration.
        /// @throws std::invalid_argument Thrown when the key filter is not registered with the topic or the filter is
        /// invalid.
        template<typename KeyFilterCriteria>
        FilteredKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Filter<KeyFilterCriteria>& keyFilter,
            std::string name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /// Constructs a new reader for the given key filter and sample filter criteria. The construction of the
        /// reader connects the reader to writers whose key matches the key filter criteria.
        /// @param topic The topic.
        /// @param keyFilter The key filter.
        /// @param sampleFilter The sample filter.
        /// @param name The optional reader name.
        /// @param config The reader configuration.
        /// @throws std::invalid_argument Thrown when the key filter is not registered with the topic or the filter is
        /// invalid.
        template<typename KeyFilterCriteria, typename SampleFilterCriteria>
        FilteredKeyReader(
            const Topic<Key, Value, UpdateTag>& topic,
            const Filter<KeyFilterCriteria>& keyFilter,
            const Filter<SampleFilterCriteria>& sampleFilter,
            std::string name = std::string(),
            const ReaderConfig& config = ReaderConfig());

        /// Move constructor
        /// @param reader The reader to move from.
        FilteredKeyReader(FilteredKeyReader&& reader) noexcept;

        /// Move assignment operator.
        /// @param reader The reader to move from.
        /// @return A reference to this reader.
        FilteredKeyReader& operator=(FilteredKeyReader&& reader) noexcept;
    };

    /// Creates a new filtered reader for the given topic and key filter. This helper method deduces the topic Key,
    /// Value and UpdateTag types from the topic argument.
    /// @param topic The topic.
    /// @param filter The key filter.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename KFC, typename K, typename V, typename UT>
    [[nodiscard]] FilteredKeyReader<K, V, UT> makeFilteredKeyReader(
        const Topic<K, V, UT>& topic,
        const Filter<KFC>& filter,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return FilteredKeyReader<K, V, UT>(topic, filter, std::move(name), config);
    }

    /// Creates a new filter reader for the given topic, key filter and sample filter. This helper method deduces
    /// the topic Key, Value and UpdateTag types from the topic argument.
    /// @param topic The topic.
    /// @param keyFilter The key filter.
    /// @param sampleFilter The sample filter.
    /// @param name The optional reader name.
    /// @param config The optional reader configuration.
    template<typename KFC, typename SFC, typename K, typename V, typename UT>
    [[nodiscard]] FilteredKeyReader<K, V, UT> makeFilteredKeyReader(
        const Topic<K, V, UT>& topic,
        const Filter<KFC>& keyFilter,
        const Filter<SFC>& sampleFilter,
        std::string name = std::string(),
        const ReaderConfig& config = ReaderConfig())
    {
        return FilteredKeyReader<K, V, UT>(topic, keyFilter, sampleFilter, std::move(name), config);
    }

    /// The key writer to write the data element associated with a given key.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class SingleKeyWriter : public Writer<Key, Value, UpdateTag>
    {
    public:
        /// Constructs a new writer for the given key. The construction of the writer connects the writer to readers
        /// with a matching key.
        /// @param topic The topic.
        /// @param key The key of the data element to write.
        /// @param name The optional writer name.
        /// @param config The writer configuration.
        SingleKeyWriter(
            const Topic<Key, Value, UpdateTag>& topic,
            const Key& key,
            std::string name = std::string(),
            const WriterConfig& config = WriterConfig());

        /// Move constructor.
        /// @param writer The writer to move from.
        SingleKeyWriter(SingleKeyWriter&& writer) noexcept;

        /// Move assignment operator.
        /// @param writer The writer to move from.
        /// @return A reference to this writer.
        SingleKeyWriter& operator=(SingleKeyWriter&& writer) noexcept;

        /// Adds the data element. This generates a SampleEvent::Add sample with the given value.
        /// @param value The data element value.
        void add(const Value& value);

        /// Updates the data element. This generates a SampleEvent::Update sample with the given value.
        /// @param value The data element value.
        void update(const Value& value);

        /// Gets a partial update generator function for the given partial update tag. When called, the returned
        /// function generates a SampleEvent::PartialUpdate sample with the given partial update value.
        /// The UpdateValue template parameter must match the UpdateValue type used to register the updater with
        /// the Topic::setUpdater method.
        /// @param tag The partial update tag.
        template<typename UpdateValue>
        [[nodiscard]] std::function<void(const UpdateValue&)> partialUpdate(const UpdateTag& tag);

        /// Removes the data element. This generates a SampleEvent::Remove sample.
        void remove() noexcept;

    private:
        const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
    };

    /// The key writer to write data elements associated with a given set of keys.
    /// @headerfile DataStorm/DataStorm.h
    template<typename Key, typename Value, typename UpdateTag = std::string>
    class MultiKeyWriter : public Writer<Key, Value, UpdateTag>
    {
    public:
        /// Constructs a new writer for the given keys. The construction of the writer connects the writer to
        /// readers with matching keys. If an empty vector of keys is provided, the writer will connect to all the
        /// available readers.
        /// @param topic The topic.
        /// @param keys The keys.
        /// @param name The optional writer name.
        /// @param config The writer configuration.
        MultiKeyWriter(
            const Topic<Key, Value, UpdateTag>& topic,
            const std::vector<Key>& keys,
            std::string name = std::string(),
            const WriterConfig& config = WriterConfig());

        /// Move constructor
        /// @param writer The writer to move from.
        MultiKeyWriter(MultiKeyWriter&& writer) noexcept;

        /// Move assignment operator.
        /// @param writer The writer to move from.
        /// @return A reference to this writer.
        MultiKeyWriter& operator=(MultiKeyWriter&& writer) noexcept;

        /// Adds the data element. This generates a SampleEvent::Add sample with the given value.
        /// @param key The key
        /// @param value The data element value.
        void add(const Key& key, const Value& value);

        /// Updates the data element. This generates a SampleEvent::Update sample with the given value.
        /// @param key The key
        /// @param value The data element value.
        void update(const Key& key, const Value& value);

        /// Gets a partial update generator function for the given partial update tag. When called, the returned
        /// function generates a SampleEvent::PartialUpdate sample with the given partial update value.
        /// The UpdateValue template parameter must match the UpdateValue type used to register the updater with
        /// the Topic::setUpdater method.
        /// @param tag The partial update tag.
        template<typename UpdateValue>
        [[nodiscard]] std::function<void(const Key&, const UpdateValue&)> partialUpdate(const UpdateTag& tag);

        /// Removes the data element. This generates a TopicEvent::Remove sample.
        /// @param key The key
        void remove(const Key& key) noexcept;

    private:
        const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
        const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
    };

    /// Creates a key writer for the given topic and key. This helper method deduces the topic Key, Value and
    /// UpdateTag types from the topic argument.
    /// @param topic The topic.
    /// @param key The key.
    /// @param name The optional writer name.
    /// @param config The optional writer configuration.
    template<typename K, typename V, typename UT>
    SingleKeyWriter<K, V, UT> makeSingleKeyWriter(
        const Topic<K, V, UT>& topic,
        const typename Topic<K, V, UT>::KeyType& key,
        std::string name = std::string(),
        const WriterConfig& config = WriterConfig())
    {
        return SingleKeyWriter<K, V, UT>(topic, key, std::move(name), config);
    }

    /// Creates a multi-key writer for the given topic and keys. This helper method deduces the topic Key, Value
    /// and UpdateTag types from the topic argument.
    /// @param topic The topic.
    /// @param keys The keys.
    /// @param name The optional writer name.
    /// @param config The optional writer configuration.
    template<typename K, typename V, typename UT>
    [[nodiscard]] MultiKeyWriter<K, V, UT> makeMultiKeyWriter(
        const Topic<K, V, UT>& topic,
        const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
        std::string name = std::string(),
        const WriterConfig& config = WriterConfig())
    {
        return MultiKeyWriter<K, V, UT>(topic, keys, std::move(name), config);
    }

    /// Creates an any-key writer for the given topic. This helper method deduces the topic Key, Value and
    /// UpdateTag types from the topic argument.
    /// @param topic The topic.
    /// @param name The optional writer name.
    /// @param config The optional writer configuration.
    template<typename K, typename V, typename UT>
    [[nodiscard]] MultiKeyWriter<K, V, UT> makeAnyKeyWriter(
        const Topic<K, V, UT>& topic,
        std::string name = std::string(),
        const WriterConfig& config = WriterConfig())
    {
        return MultiKeyWriter<K, V, UT>(topic, {}, std::move(name), config);
    }

    //
    // Public template based API implementation
    //

    //
    // Sample template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    SampleEvent Sample<Key, Value, UpdateTag>::getEvent() const noexcept
    {
        return _impl->event;
    }

    template<typename Key, typename Value, typename UpdateTag>
    const Key& Sample<Key, Value, UpdateTag>::getKey() const noexcept
    {
        return _impl->getKey();
    }

    template<typename Key, typename Value, typename UpdateTag>
    const Value& Sample<Key, Value, UpdateTag>::getValue() const noexcept
    {
        return _impl->getValue();
    }

    template<typename Key, typename Value, typename UpdateTag>
    UpdateTag Sample<Key, Value, UpdateTag>::getUpdateTag() const
    {
        return _impl->getTag();
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::chrono::time_point<std::chrono::system_clock> Sample<Key, Value, UpdateTag>::getTimeStamp() const noexcept
    {
        return _impl->timestamp;
    }

    template<typename Key, typename Value, typename UpdateTag>
    const std::string& Sample<Key, Value, UpdateTag>::getOrigin() const noexcept
    {
        return _impl->origin;
    }

    template<typename Key, typename Value, typename UpdateTag>
    const std::string& Sample<Key, Value, UpdateTag>::getSession() const noexcept
    {
        return _impl->session;
    }

    template<typename Key, typename Value, typename UpdateTag>
    Sample<Key, Value, UpdateTag>::Sample(const std::shared_ptr<DataStormI::Sample>& impl) noexcept
        : _impl(std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(impl))
    {
    }

    //
    // Reader template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    Reader<Key, Value, UpdateTag>::Reader(Reader<Key, Value, UpdateTag>&& reader) noexcept
        : _impl(std::move(reader._impl))
    {
    }

    template<typename Key, typename Value, typename UpdateTag> Reader<Key, Value, UpdateTag>::~Reader()
    {
        if (_impl)
        {
            _impl->destroy();
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    Reader<Key, Value, UpdateTag>& Reader<Key, Value, UpdateTag>::operator=(Reader&& reader) noexcept
    {
        if (_impl)
        {
            _impl->destroy();
        }
        _impl = std::move(reader._impl);
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Reader<Key, Value, UpdateTag>::hasWriters() const noexcept
    {
        return _impl->hasWriters();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::waitForWriters(unsigned int count) const
    {
        _impl->waitForWriters(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::waitForNoWriters() const
    {
        _impl->waitForWriters(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<std::string> Reader<Key, Value, UpdateTag>::getConnectedWriters() const
    {
        return _impl->getConnectedElements();
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Key> Reader<Key, Value, UpdateTag>::getConnectedKeys() const
    {
        std::vector<Key> keys;
        auto connectedKeys = _impl->getConnectedKeys();
        keys.reserve(connectedKeys.size());
        for (const auto& k : connectedKeys)
        {
            keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
        }
        return keys;
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Sample<Key, Value, UpdateTag>> Reader<Key, Value, UpdateTag>::getAllUnread()
    {
        auto unread = _impl->getAllUnread();
        std::vector<Sample<Key, Value, UpdateTag>> samples;
        samples.reserve(unread.size());
        for (const auto& sample : unread)
        {
            samples.push_back(sample);
        }
        return samples;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::waitForUnread(unsigned int count) const
    {
        _impl->waitForUnread(count);
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Reader<Key, Value, UpdateTag>::hasUnread() const noexcept
    {
        return _impl->hasUnread();
    }

    template<typename Key, typename Value, typename UpdateTag>
    Sample<Key, Value, UpdateTag> Reader<Key, Value, UpdateTag>::getNextUnread()
    {
        return Sample<Key, Value, UpdateTag>(_impl->getNextUnread());
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::onConnectedKeys(
        std::function<void(std::vector<Key>)> init,
        std::function<void(CallbackReason, Key)> update) noexcept
    {
        _impl->onConnectedKeys(
            init ?
            [init = std::move(init)](const std::vector<std::shared_ptr<DataStormI::Key>>& connectedKeys)
            {
                std::vector<Key> keys;
                keys.reserve(connectedKeys.size());
                for(const auto& k : connectedKeys)
                {
                    keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
                }
                init(std::move(keys));
            } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>{},
            update ?
            [update = std::move(update)](CallbackReason action, const std::shared_ptr<DataStormI::Key>& key)
            {
                update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
            } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>{});
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::onConnectedWriters(
        std::function<void(std::vector<std::string>)> init,
        std::function<void(CallbackReason, std::string)> update) noexcept
    {
        _impl->onConnectedElements(std::move(init), std::move(update));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Reader<Key, Value, UpdateTag>::onSamples(
        std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
        std::function<void(Sample<Key, Value, UpdateTag>)> update) noexcept
    {
        auto communicator = _impl->getCommunicator();
        _impl->onSamples(
            init ?
            [communicator, init = std::move(init)](const std::vector<std::shared_ptr<DataStormI::Sample>>& samplesI)
            {
                std::vector<Sample<Key, Value, UpdateTag>> samples;
                samples.reserve(samplesI.size());
                for(const auto& s : samplesI)
                {
                    samples.emplace_back(s);
                }
                init(std::move(samples));
            } : std::function<void(const std::vector<std::shared_ptr<DataStormI::Sample>>&)>(),
            update ?
            [communicator, update = std::move(update)](const std::shared_ptr<DataStormI::Sample>& sampleI)
            {
                update(sampleI);
            } : std::function<void(const std::shared_ptr<DataStormI::Sample>&)>{});
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyReader<Key, Value, UpdateTag>::SingleKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Key& key,
        std::string name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(
              topic.getReader()->create({topic._keyFactory->create(key)}, std::move(name), config))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename SampleFilterCriteria>
    SingleKeyReader<Key, Value, UpdateTag>::SingleKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Key& key,
        const Filter<SampleFilterCriteria>& sampleFilter,
        std::string name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->create(
              {topic._keyFactory->create(key)},
              std::move(name),
              config,
              sampleFilter.name,
              DataStormI::EncoderT<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyReader<Key, Value, UpdateTag>::SingleKeyReader(SingleKeyReader<Key, Value, UpdateTag>&& reader) noexcept
        : Reader<Key, Value, UpdateTag>(std::move(reader))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyReader<Key, Value, UpdateTag>&
    SingleKeyReader<Key, Value, UpdateTag>::operator=(SingleKeyReader&& reader) noexcept
    {
        Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyReader<Key, Value, UpdateTag>::MultiKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const std::vector<Key>& keys,
        std::string name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(
              topic.getReader()->create(topic._keyFactory->create(keys), std::move(name), config))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename SampleFilterCriteria>
    MultiKeyReader<Key, Value, UpdateTag>::MultiKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const std::vector<Key>& keys,
        const Filter<SampleFilterCriteria>& sampleFilter,
        std::string name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->create(
              topic._keyFactory->create(keys),
              std::move(name),
              config,
              sampleFilter.name,
              Encoder<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyReader<Key, Value, UpdateTag>::MultiKeyReader(MultiKeyReader<Key, Value, UpdateTag>&& reader) noexcept
        : Reader<Key, Value, UpdateTag>(std::move(reader))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyReader<Key, Value, UpdateTag>&
    MultiKeyReader<Key, Value, UpdateTag>::operator=(MultiKeyReader&& reader) noexcept
    {
        Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename KeyFilterCriteria>
    FilteredKeyReader<Key, Value, UpdateTag>::FilteredKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Filter<KeyFilterCriteria>& filter,
        std::string name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
              topic._keyFilterFactories->create(filter.name, filter.criteria),
              std::move(name),
              config))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename KeyFilterCriteria, typename SampleFilterCriteria>
    FilteredKeyReader<Key, Value, UpdateTag>::FilteredKeyReader(
        const Topic<Key, Value, UpdateTag>& topic,
        const Filter<KeyFilterCriteria>& keyFilter,
        const Filter<SampleFilterCriteria>& sampleFilter,
        std::string name,
        const ReaderConfig& config)
        : Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
              topic._keyFilterFactories->create(keyFilter.name, keyFilter.criteria),
              std::move(name),
              config,
              sampleFilter.name,
              Encoder<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    FilteredKeyReader<Key, Value, UpdateTag>::FilteredKeyReader(
        FilteredKeyReader<Key, Value, UpdateTag>&& reader) noexcept
        : Reader<Key, Value, UpdateTag>(std::move(reader))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    FilteredKeyReader<Key, Value, UpdateTag>&
    FilteredKeyReader<Key, Value, UpdateTag>::operator=(FilteredKeyReader&& reader) noexcept
    {
        Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
        return *this;
    }

    //
    // Writer template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    Writer<Key, Value, UpdateTag>::Writer(Writer&& writer) noexcept : _impl(std::move(writer._impl))
    {
    }

    template<typename Key, typename Value, typename UpdateTag> Writer<Key, Value, UpdateTag>::~Writer()
    {
        if (_impl)
        {
            _impl->destroy();
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    Writer<Key, Value, UpdateTag>& Writer<Key, Value, UpdateTag>::operator=(Writer&& writer) noexcept
    {
        if (_impl)
        {
            _impl->destroy();
        }
        _impl = std::move(writer._impl);
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Writer<Key, Value, UpdateTag>::hasReaders() const noexcept
    {
        return _impl->hasReaders();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::waitForReaders(unsigned int count) const
    {
        return _impl->waitForReaders(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::waitForNoReaders() const
    {
        return _impl->waitForReaders(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<std::string> Writer<Key, Value, UpdateTag>::getConnectedReaders() const
    {
        return _impl->getConnectedElements();
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Key> Writer<Key, Value, UpdateTag>::getConnectedKeys() const
    {
        std::vector<Key> keys;
        auto connectedKeys = _impl->getConnectedKeys();
        keys.reserve(connectedKeys.size());
        for (const auto& k : connectedKeys)
        {
            keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
        }
        return keys;
    }

    template<typename Key, typename Value, typename UpdateTag>
    Sample<Key, Value, UpdateTag> Writer<Key, Value, UpdateTag>::getLast()
    {
        auto sample = _impl->getLast();
        if (!sample)
        {
            throw std::logic_error("no sample");
        }
        return Sample<Key, Value, UpdateTag>(sample);
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::vector<Sample<Key, Value, UpdateTag>> Writer<Key, Value, UpdateTag>::getAll()
    {
        auto all = _impl->getAll();
        std::vector<Sample<Key, Value, UpdateTag>> samples;
        samples.reserve(all.size());
        for (const auto& sample : all)
        {
            samples.push_back(sample);
        }
        return samples;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::onConnectedKeys(
        std::function<void(std::vector<Key>)> init,
        std::function<void(CallbackReason, Key)> update) noexcept
    {
        _impl->onConnectedKeys(
            init ?
            [init = std::move(init)](const std::vector<std::shared_ptr<DataStormI::Key>>& connectedKeys)
            {
                std::vector<Key> keys;
                keys.reserve(connectedKeys.size());
                for(const auto& k : connectedKeys)
                {
                    keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
                }
                init(std::move(keys));
            } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>{},
            update ?
            [update = std::move(update)](CallbackReason action, const std::shared_ptr<DataStormI::Key>& key)
            {
                update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
            } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>{});
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Writer<Key, Value, UpdateTag>::onConnectedReaders(
        std::function<void(std::vector<std::string>)> init,
        std::function<void(CallbackReason, std::string)> update) noexcept
    {
        _impl->onConnectedElements(std::move(init), std::move(update));
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyWriter<Key, Value, UpdateTag>::SingleKeyWriter(
        const Topic<Key, Value, UpdateTag>& topic,
        const Key& key,
        std::string name,
        const WriterConfig& config)
        : Writer<Key, Value, UpdateTag>(
              topic.getWriter()->create({topic._keyFactory->create(key)}, std::move(name), config)),
          _tagFactory(topic._tagFactory)
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyWriter<Key, Value, UpdateTag>::SingleKeyWriter(SingleKeyWriter<Key, Value, UpdateTag>&& writer) noexcept
        : Writer<Key, Value, UpdateTag>(std::move(writer)),
          _tagFactory(std::move(writer._tagFactory))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    SingleKeyWriter<Key, Value, UpdateTag>&
    SingleKeyWriter<Key, Value, UpdateTag>::operator=(SingleKeyWriter&& writer) noexcept
    {
        Writer<Key, Value, UpdateTag>::operator=(std::move(writer));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void SingleKeyWriter<Key, Value, UpdateTag>::add(const Value& value)
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            nullptr,
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void SingleKeyWriter<Key, Value, UpdateTag>::update(const Value& value)
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            nullptr,
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename UpdateValue>
    std::function<void(const UpdateValue&)> SingleKeyWriter<Key, Value, UpdateTag>::partialUpdate(const UpdateTag& tag)
    {
        auto impl = Writer<Key, Value, UpdateTag>::_impl;
        auto updateTag = _tagFactory->create(tag);
        return [impl, updateTag](const UpdateValue& value)
        {
            auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
            impl->publish(nullptr, std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
        };
    }

    template<typename Key, typename Value, typename UpdateTag>
    void SingleKeyWriter<Key, Value, UpdateTag>::remove() noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            nullptr,
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyWriter<Key, Value, UpdateTag>::MultiKeyWriter(
        const Topic<Key, Value, UpdateTag>& topic,
        const std::vector<Key>& keys,
        std::string name,
        const WriterConfig& config)
        : Writer<Key, Value, UpdateTag>(
              topic.getWriter()->create(topic._keyFactory->create(keys), std::move(name), config)),
          _keyFactory(topic._keyFactory),
          _tagFactory(topic._tagFactory)
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyWriter<Key, Value, UpdateTag>::MultiKeyWriter(MultiKeyWriter<Key, Value, UpdateTag>&& writer) noexcept
        : Writer<Key, Value, UpdateTag>(std::move(writer)),
          _keyFactory(std::move(writer._keyFactory)),
          _tagFactory(std::move(writer._tagFactory))
    {
    }

    template<typename Key, typename Value, typename UpdateTag>
    MultiKeyWriter<Key, Value, UpdateTag>&
    MultiKeyWriter<Key, Value, UpdateTag>::operator=(MultiKeyWriter&& writer) noexcept
    {
        Writer<Key, Value, UpdateTag>::operator=(std::move(writer));
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    void MultiKeyWriter<Key, Value, UpdateTag>::add(const Key& key, const Value& value)
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            _keyFactory->create(key),
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void MultiKeyWriter<Key, Value, UpdateTag>::update(const Key& key, const Value& value)
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            _keyFactory->create(key),
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename UpdateValue>
    std::function<void(const Key&, const UpdateValue&)>
    MultiKeyWriter<Key, Value, UpdateTag>::partialUpdate(const UpdateTag& tag)
    {
        auto impl = Writer<Key, Value, UpdateTag>::_impl;
        auto updateTag = _tagFactory->create(tag);
        auto keyFactory = _keyFactory;
        return [impl, updateTag, keyFactory](const Key& key, const UpdateValue& value)
        {
            auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
            impl->publish(
                keyFactory->create(key),
                std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
        };
    }

    template<typename Key, typename Value, typename UpdateTag>
    void MultiKeyWriter<Key, Value, UpdateTag>::remove(const Key& key) noexcept
    {
        Writer<Key, Value, UpdateTag>::_impl->publish(
            _keyFactory->create(key),
            std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
    }

    /// @private
    template<typename Value> std::function<std::function<bool(const Value&)>(const std::string&)> makeRegexFilter()
    {
        // std::regex's constructor accepts a const string&; it does not accept a string_view.
        return [](const std::string& criteria)
        {
            std::regex expr(criteria);
            return [expr = std::move(expr)](const Value& value)
            {
                std::ostringstream os;
                os << value;
                return std::regex_match(os.str(), expr);
            };
        };
    }

    /// @private
    template<typename Key, typename Value, typename UpdateTag>
    std::function<std::function<bool(const Sample<Key, Value, UpdateTag>&)>(const SampleEventSeq&)>
    makeSampleEventFilter(const Topic<Key, Value, UpdateTag>&)
    {
        return [](const SampleEventSeq& criteria)
        {
            return [criteria](const Sample<Key, Value, UpdateTag>& sample)
            { return std::find(criteria.begin(), criteria.end(), sample.getEvent()) != criteria.end(); };
        };
    }

    /// @private
    template<typename T, typename V, typename Enabler = void> struct RegexFilter
    {
        template<typename F> static void add(const F&) {}
    };

    /// @private
    template<typename T, typename V> struct RegexFilter<T, V, std::enable_if_t<DataStormI::is_streamable<V>::value>>
    {
        template<typename F> static void add(const F& factory)
        {
            factory->set("_regex", makeRegexFilter<T>()); // Only set the _regex filter if the value is streamable
        }
    };

    //
    // Topic template implementation
    //
    template<typename Key, typename Value, typename UpdateTag>
    Topic<Key, Value, UpdateTag>::Topic(const Node& node, std::string name) noexcept
        : _name(std::move(name)),
          _topicFactory(node._factory),
          _keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
          _tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
          _keyFilterFactories(std::make_shared<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>>()),
          _sampleFilterFactories(
              std::make_shared<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>>())
    {
        RegexFilter<Key, Key>::add(_keyFilterFactories);
        RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
        _sampleFilterFactories->set("_event", makeSampleEventFilter(*this));
    }

    template<typename Key, typename Value, typename UpdateTag> Topic<Key, Value, UpdateTag>::~Topic()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (_reader)
        {
            _reader->destroy();
        }
        if (_writer)
        {
            _writer->destroy();
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    Topic<Key, Value, UpdateTag>& Topic<Key, Value, UpdateTag>::operator=(Topic<Key, Value, UpdateTag>&& topic) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (_reader)
        {
            _reader->destroy();
        }
        if (_writer)
        {
            _writer->destroy();
        }
        _name = std::move(topic._name);
        _topicFactory = std::move(topic._topicFactory);
        _keyFactory = std::move(topic._keyFactory);
        _tagFactory = std::move(topic._tagFactory);
        _keyFilterFactories = std::move(topic._keyFilterFactories);
        _sampleFilterFactories = std::move(topic._sampleFilterFactories);
        _reader = std::move(topic._reader);
        _writer = std::move(topic._writer);
        _updaters = std::move(topic._updaters);
        return *this;
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Topic<Key, Value, UpdateTag>::hasWriters() const noexcept
    {
        return getReader()->hasWriters();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForWriters(unsigned int count) const
    {
        getReader()->waitForWriters(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForNoWriters() const
    {
        getReader()->waitForWriters(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::setReaderDefaultConfig(const ReaderConfig& config) noexcept
    {
        getReader()->setDefaultConfig(config);
    }

    template<typename Key, typename Value, typename UpdateTag>
    bool Topic<Key, Value, UpdateTag>::hasReaders() const noexcept
    {
        return getWriter()->hasReaders();
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForReaders(unsigned int count) const
    {
        getWriter()->waitForReaders(static_cast<int>(count));
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::waitForNoReaders() const
    {
        getWriter()->waitForReaders(-1);
    }

    template<typename Key, typename Value, typename UpdateTag>
    void Topic<Key, Value, UpdateTag>::setWriterDefaultConfig(const WriterConfig& config) noexcept
    {
        getWriter()->setDefaultConfig(config);
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename UpdateValue>
    void Topic<Key, Value, UpdateTag>::setUpdater(
        const UpdateTag& tag,
        std::function<void(Value&, UpdateValue)> updater) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        auto tagI = _tagFactory->create(std::move(tag));
        auto updaterImpl =
            updater ?
            [updater = std::move(updater)](const std::shared_ptr<DataStormI::Sample>& previous,
                                           const std::shared_ptr<DataStormI::Sample>& next,
                                           const Ice::CommunicatorPtr& communicator)
            {
                Value value;
                if (previous)
                {
                    value = Cloner<Value>::clone(
                        std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
                }
                updater(value, Decoder<UpdateValue>::decode(communicator, next->getEncodedValue()));
                std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(next)->setValue(std::move(value));
            } : std::function<void(const std::shared_ptr<DataStormI::Sample>&,
                                const std::shared_ptr<DataStormI::Sample>&,
                                const Ice::CommunicatorPtr&)>{};

        if (_reader && !_writer)
        {
            _reader->setUpdater(tagI, updaterImpl);
        }
        else if (_writer && !_reader)
        {
            _writer->setUpdater(tagI, updaterImpl);
        }
        else if (_reader && _writer)
        {
            _reader->setUpdater(tagI, updaterImpl);
            _writer->setUpdater(tagI, updaterImpl);
        }
        else
        {
            _updaters[tagI] = updaterImpl;
        }
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename Criteria>
    void Topic<Key, Value, UpdateTag>::setKeyFilter(
        std::string name,
        std::function<std::function<bool(const Key&)>(const Criteria&)> factory) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _keyFilterFactories->set(std::move(name), std::move(factory));
    }

    template<typename Key, typename Value, typename UpdateTag>
    template<typename Criteria>
    void Topic<Key, Value, UpdateTag>::setSampleFilter(
        std::string name,
        std::function<std::function<bool(const SampleType&)>(const Criteria&)> factory) noexcept
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _sampleFilterFactories->set(std::move(name), std::move(factory));
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::shared_ptr<DataStormI::TopicReader> Topic<Key, Value, UpdateTag>::getReader() const
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (!_reader)
        {
            auto sampleFactory = std::make_shared<DataStormI::SampleFactoryT<Key, Value, UpdateTag>>();
            _reader = _topicFactory->createTopicReader(
                _name,
                _keyFactory,
                _tagFactory,
                std::move(sampleFactory),
                _keyFilterFactories,
                _sampleFilterFactories);
            _reader->setUpdaters(_writer ? _writer->getUpdaters() : _updaters);
            _updaters.clear();
        }
        return _reader;
    }

    template<typename Key, typename Value, typename UpdateTag>
    std::shared_ptr<DataStormI::TopicWriter> Topic<Key, Value, UpdateTag>::getWriter() const
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (!_writer)
        {
            _writer = _topicFactory->createTopicWriter(
                _name,
                _keyFactory,
                _tagFactory,
                nullptr,
                _keyFilterFactories,
                _sampleFilterFactories);
            _writer->setUpdaters(_reader ? _reader->getUpdaters() : _updaters);
            _updaters.clear();
        }
        return _writer;
    }

    template<typename Key, typename Value, typename UpdateTag>
    Ice::CommunicatorPtr Topic<Key, Value, UpdateTag>::getCommunicator() const noexcept
    {
        return _topicFactory->getCommunicator();
    }
}

#if defined(__clang__)
#    pragma clang diagnostic pop
#elif defined(__GNUC__)
#    pragma GCC diagnostic pop
#endif

#endif
